Skip to content

Add the client-side subscriptions/listen driver#3047

Open
maxisbey wants to merge 7 commits into
listen-client-plumbingfrom
listen-client-driver
Open

Add the client-side subscriptions/listen driver#3047
maxisbey wants to merge 7 commits into
listen-client-plumbingfrom
listen-client-driver

Conversation

@maxisbey

@maxisbey maxisbey commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Stacked on #3046 (the request-id and cancellation plumbing) — review this one for the driver only; the base branch carries the transport work.

The client half of subscriptions/listen (2026-07-28, SEP-2575), shaped after the ecosystem's own streaming idioms: one context manager, async for consumption, the acknowledgment as state on the handle rather than an event in the stream.

async with client.listen(tools_list_changed=True, resource_subscriptions=["note://todo"]) as sub:
    print(sub.honored)  # the subset the server agreed to deliver
    async for event in sub:
        match event:
            case ToolsListChanged():
                tools = await client.list_tools()
            case ResourceUpdated(uri=uri):
                body = await client.read_resource(uri)

Motivation and Context

On the 2026 wire the response to subscriptions/listen is the notification stream. The server half shipped in #3035; until now a client had to hand-roll a message_handler, park a raw send_request in a task, and demultiplex _meta subscription ids by hand (see the old examples/stories/subscriptions/client.py for what that looked like). The driver folds all of it into structured concurrency:

  • Entering waits for the acknowledgment (spec: the ack MUST be the stream's first message), so sub.honored — the filter subset the server agreed to deliver — is always populated, making the spec's "client SHOULD check the acknowledged filter" a plain attribute read. Pre-ack failures raise from enter (server rejection as MCPError, a stream that dies unacknowledged as SubscriptionLost); nothing degrades silently.
  • Iteration yields typed events — the same four frozen dataclasses the server publishes, now defined once in mcp/shared/subscriptions.py and re-exported from the server module (no import breaks). Events are level triggers with no payload beyond identity, so unconsumed duplicates deduplicate and the backlog is bounded by the filter's width by construction — no cap, no overflow policy.
  • The stream's two endings are control flow, mirroring the spec's clean-end-vs-abrupt-drop distinction: the server's empty result ends the async for; a drop raises SubscriptionLost. No replay, no automatic re-listen — a reconnecting client refetches, which the docs show as a four-line loop.
  • Exiting the context ends the subscription with the transport's own spelling via the base branch's plumbing: the POST's response stream is aborted over streamable HTTP (that wire's cancellation signal), notifications/cancelled is sent on stream transports. The driver itself is transport-blind.
  • Multiple subscriptions demultiplex independently by their verbatim subscription ids; tool calls run freely beside open streams — the docs cover the watcher-task pattern explicitly.
  • Client.listen() on a pre-2026 connection raises a typed ListenNotSupportedError steering to the legacy verbs (TypeScript SDK parity), and subscribe_resource/unsubscribe_resource now carry deprecation warnings on both Client and ClientSession, with a migration-guide entry.

Session-level mechanics worth a reviewer's attention: acks are consumed only for ids in the driver's minted namespace, so raw escape-hatch listens (driving send_request directly) still observe their acks through message_handler; delivered events are handed to the consumer after the message_handler tee, so the caching layer's eviction always completes before an event-triggered refetch can run; and session teardown settles every open route, so a watcher task in a sibling task group gets SubscriptionLost instead of hanging when the client closes. One transport fix rides along: a per-request SSE stream that drops without ever carrying an event id now resolves its request with CONNECTION_CLOSED instead of leaving the waiter pending for the session's lifetime.

How Has This Been Tested?

  • Full suite: 100.00% line+branch coverage, pyright clean.
  • 24 driver tests (in-process, public API): ack surfacing, all four event kinds, dedupe, graceful/lost/local endings, era steer, rejection-from-enter, degenerate immediate-result open, server-sent cancel pre- and post-ack, slot release on exit, concurrent demux, tee ordering, ack-timeout, read-timeout exemption, duplicate acks, teardown-unblocks-sibling-consumer, raw-id collision containment, retained-handle staleness, bare-string filter rejection.
  • 4 interaction tests across the connect matrix (in-memory + streamable HTTP at 2026; the era guard on the 2025 cells) with requirement-manifest rows.
  • Docs: the subscriptions page gained a client section backed by a tested snippet; the subscriptions story is rewritten from ~95 lines of hand-rolled demux to the driver and runs across its matrix.
  • Manually against a real uvicorn server through the public Client: honored filter round-trip, both event kinds, exact-URI filter silence, exit-frees-slot proven by a second listen on the same session, refetch-after-change, and the legacy steer.

Breaking Changes

None. The event types moved to mcp/shared/subscriptions.py but remain importable from mcp.server.subscriptions; the deprecated subscription verbs keep working against 2025-era servers.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

The conformance suite has no client-role listen scenarios — the upstream SEP traceability file explicitly excludes the two client-side obligations (subscription-id correlation is client-internal demux, not wire-observable; the ack check has no wire-observable definition) and the stdio client harness is tracked separately upstream. Coverage here is therefore the interaction suite plus TypeScript-SDK behavioral parity; flagging per the repo's conformance rule so the exclusion is a deliberate acceptance, not an oversight.

Deliberate omissions, for the record: no callbacks layer (the TypeScript SDK routes listen events into its notification-handler registrations; here message_handler still sees every teed frame, so a handler-style consumer remains constructible), no auto-open-at-connect, no auto-re-listen, and no close() method — the context manager is the lifecycle.

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

📚 Documentation preview

Preview https://pr-3047.mcp-python-docs.pages.dev
Deployment https://62df9a64.mcp-python-docs.pages.dev
Commit d7c16b1
Triggered by @maxisbey
Updated 2026-07-02 15:52:38 UTC

@maxisbey maxisbey marked this pull request as ready for review July 1, 2026 19:18

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 23 files

Reply with feedback, questions, or to request a fix.

Fix all with cubic | Re-trigger cubic

Comment thread src/mcp/client/streamable_http.py
Comment thread docs/advanced/subscriptions.md Outdated
Comment thread src/mcp/client/subscriptions.py
Comment thread src/mcp/client/subscriptions.py Outdated
Comment thread src/mcp/client/streamable_http.py Outdated
Comment thread src/mcp/client/session.py

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 16 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="docs/advanced/subscriptions.md">

<violation number="1" location="docs/advanced/subscriptions.md:91">
P2: The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.</violation>
</file>

Reply with feedback, questions, or to request a fix.

Fix all with cubic | Re-trigger cubic

```

* `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event.
* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.

@cubic-dev-ai cubic-dev-ai Bot Jul 1, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/advanced/subscriptions.md, line 91:

<comment>The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.</comment>

<file context>
@@ -88,9 +88,9 @@ Consuming a subscription is one context manager:
 
 * `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event.
-* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
+* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
 * Leaving the block ends the subscription, with the transport's own spelling: over streamable HTTP the request's response stream is closed (that is the 2026 cancellation signal), on stream transports `notifications/cancelled` is sent.
-* The stream's two endings are control flow. The server closing gracefully simply ends the `async for`; an abrupt drop raises `SubscriptionLost`. There is no replay and no automatic re-listen — a client that reconnects refetches what it depends on:
</file context>
Suggested change
* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
Fix with cubic

Comment thread src/mcp/client/subscriptions.py
maxisbey added 5 commits July 2, 2026 15:30
client.listen() opens one subscription as an async context manager:
entering sends the request and waits for the server's acknowledgment
(the honored filter subset is on the handle before the first event),
iteration yields the same four typed events the server publishes, a
graceful server close simply ends the loop, and an abrupt drop raises
SubscriptionLost. Exiting the context ends the subscription with the
transport's own cancellation spelling (aborting the request's stream
over streamable HTTP, notifications/cancelled on stream transports).
There is no replay and no automatic re-listen. Pending events
deduplicate - every kind is a level trigger - so the backlog is
bounded by the filter's width by construction.

The event vocabulary moves to mcp/shared/subscriptions.py (re-exported
from the server module) so both sides speak the same types. The session
demultiplexes stream frames by the _meta subscription id: acks in the
driver's id namespace are consumed (raw escape-hatch listens keep
observing theirs through message_handler), change events are delivered
after the message_handler tee so cache eviction always completes before
an event-triggered refetch can run, and session teardown settles every
open route so a watcher task in a sibling task group cannot hang when
the client closes. A per-request SSE stream that drops without ever
carrying an event id now resolves its request with CONNECTION_CLOSED
instead of leaving it pending for the session's lifetime.

The legacy subscribe_resource/unsubscribe_resource verbs are deprecated
on both Client and ClientSession with a pointer at the replacement.
The docs-example checker pipes snippets to ruff in the platform
encoding; an em-dash in a code comment arrives as invalid UTF-8 on
Windows runners.
Move the client's listen-stream demux to a synchronous notification
intercept on the dispatcher's receive path, where cancelled/progress
interception already lives: acks, events, and teardown signals now
advance in wire order relative to the listen result, so a graceful
close can no longer outrun the events that preceded it or clobber the
acknowledged filter with the fabricated empty one. Ack consumption
keys on the live route registry alone (the per-session id set is
gone), route admission enforces the honored filter - loose on URIs,
which the spec lets be sub-resources, with a capped backlog settling
the route lost against floods - and the response-cache eviction
barrier moved to the consumption point, held pending until it
completes so a cancelled consumer cannot lose a level trigger.

On the transport, a request whose SSE stream can never answer - a
non-resumable drop or an exhausted reconnection budget - now resolves
with a synthesized error, contained against teardown races.

Docs: a graceful close is not "stop watching" (servers shed load by
closing gracefully, including this SDK's ListenHandler); the watch
loop re-listens on both endings, and sub.honored is the delivery
contract, not catalog state.
Comment-only pass over the branch's additions: keep the non-inferable
invariants and motivations (receive-order rationale, sub-resource
admission, the wake-snapshot race, peek/commit semantics) at one to
three lines each, tighten docstrings to Google style with Raises
sections kept, and drop narration the code already states. No code
changes.
@maxisbey maxisbey force-pushed the listen-client-plumbing branch from 469fdb2 to 69e6cd3 Compare July 2, 2026 15:35
@maxisbey maxisbey force-pushed the listen-client-driver branch from 4c95899 to 6573774 Compare July 2, 2026 15:35

@claude claude Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regressions I flagged earlier (the wire-order delivery race, the exhausted-reconnection hang) look properly resolved in the follow-up commits, and I found no new issues on this revision — but this is a substantial new public API (the listen driver, a Dispatcher protocol change, transport behavior changes), so it still warrants a human maintainer's review.

Extended reasoning...

Overview

This PR adds the client half of subscriptions/listen (SEP-2575, 2026-07-28): a new mcp/client/subscriptions.py driver exposed as Client.listen(), a shared event vocabulary in mcp/shared/subscriptions.py, a new synchronous on_notify_intercept hook on the Dispatcher protocol (implemented in both JSONRPCDispatcher and DirectDispatcher), streamable-HTTP changes so abandoned per-request SSE streams resolve their waiter with CONNECTION_CLOSED, deprecation of the legacy subscribe/unsubscribe verbs, plus docs, examples, and a large test surface (~30 files).

Prior findings and their resolution

My earlier review flagged three issues; all were addressed in follow-up commits. The delivery-vs-result ordering race was fixed at the cause by moving listen-route bookkeeping onto a synchronous receive-path intercept (with a regression test wiring the exact [ack, event, result] burst against a parked message_handler). The reconnection give-up path now resolves the original request via _resolve_abandoned_request, covering the hang class on the id-bearing branch too. The unbounded _driver_listen_ids set was removed entirely, with ack consumption keyed on the live route registry. I re-checked the current code for these and found no new issues introduced by the fixes.

Security risks

Low. No auth, crypto, or permission logic is touched. The client-side route admission filters events against the acknowledged filter, and a hostile peer flooding distinct URIs is bounded by the _MAX_PENDING_EVENTS backstop (settling the subscription lost rather than growing memory). The intercept hook contains exceptions so a raising intercept cannot take down the receive loop.

Level of scrutiny

High. This changes the Dispatcher protocol signature (a cross-cutting seam every dispatcher implementation and several test doubles had to follow), adds a new public API surface with documented semantics (graceful close vs SubscriptionLost, honored-filter contract), and modifies the streamable-HTTP transport's reconnection/abandonment behavior — all production-critical client paths. This is exactly the kind of design-bearing change a human maintainer should sign off on, regardless of test coverage.

Other factors

Test coverage is thorough (24 driver tests, dispatcher-contract tests for the new intercept, interaction-matrix tests, tested docs snippets), coverage is at 100%, and the PR description explicitly flags the conformance-suite gap as a deliberate acceptance. The remaining unresolved cubic comment about sub-resource URI wording in the docs is minor. Given the scope and the protocol-level design decisions involved, I am deferring rather than shadow-approving.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant